This document demonstrates how to use sparklyr with an Apache Spark cluster. Data are downloaded from the web and stored in Hive tables on HDFS across multiple worker nodes. RStudio Server is installed on the master node and orchestrates the analysis in spark. Here is the basic workflow.
This demonstration uses Amazon Web Services (AWS), but it could just as easily use Microsot, Google, or any other provider. We will use Elastic Map Reduce (EMR) to easily set up a cluster with two core nodes and one master node. Nodes use virtual servers from the Elastic Compute Cloud (EC2). Note: There is no free tier for EMR, charges will apply.
Before beginning this setup we assume you have:
Before beginning the EMR wizard setup, make sure you create the following in AWS:
Make sure to select Hive and Spark as part of the install. Note that by choosing Spark, R will also be installed on the master node as part of the distribution.
Install 2 core nodes and one master node with m3.xlarge 80 GiB storage per node. You can easily increase the number of nodes later.
Click next on the general cluster settings.
Enter your EC2 key pair and security group. Make sure the security group has ports 22 and 8787 open.
The cluster page will give you details about your EMR cluster and instructions on connecting.
Connect to the master node via SSH using your key pair. Once you connect you will see the EMR welcome.
# Log in to master node
ssh -i ~/spark-demo.pem hadoop@ec2-52-10-102-11.us-west-2.compute.amazonaws.com
EMR uses Amazon Linux which is based on Centos. Update your master node and install dependencies that will be used by R packages.
# Update
sudo yum update
sudo yum install libcurl-devel openssl-devel # used for devtools
The installation of RStudio Server is easy. Download the preview version of RStudio and install on the master node.
# Install RStudio Server
wget -P /tmp https://s3.amazonaws.com/rstudio-dailybuilds/rstudio-server-rhel-0.99.1266-x86_64.rpm
sudo yum install --nogpgcheck /tmp/rstudio-server-rhel-0.99.1266-x86_64.rpm
Create a user called rstudio-user that will perform the data analysis. Create a user directory for rstudio-user on HDFS with the hadoop fs command.
# Make User
sudo useradd -m rstudio-user
sudo passwd rstudio-user
# Create new directory in hdfs
hadoop fs -mkdir /user/rstudio-user
hadoop fs -chown rstudio-user:hadoop /user/rstudio-user
hadoop fs -chmod 777 /user/rstudio-user
The flights data is a well known data source representing 123 million flights over 22 years. It consumes roughly 12 GiB of storage in uncompressed CSV format in yearly files.
For data loading and analysis, make sure you are logged in as regular user.
# create directories on hdfs for new user
hadoop fs -mkdir /user/rstudio-user
hadoop fs -chmod 777 /user/rstudio-user
# switch user
su rstudio-user
Run the following script to download data from the web onto your master node. Download the yearly flight data and the airlines lookup table.
# Make download directory
mkdir /tmp/flights
# Download flight data by year
for i in {1987..2008}
do
echo "$(date) $i Download"
fnam=$i.csv.bz2
wget -O /tmp/flights/$fnam http://stat-computing.org/dataexpo/2009/$fnam
echo "$(date) $i Unzip"
bunzip2 /tmp/flights/$fnam
done
# Download airline carrier data
wget -O /tmp/airlines.csv http://www.transtats.bts.gov/Download_Lookup.asp?Lookup=L_UNIQUE_CARRIERS
Copy data into HDFS using the hadoop fs command.
# Copy flight data to HDFS
hadoop fs -mkdir /user/rstudio-user/flights/
hadoop fs -put /tmp/flights /user/rstudio-user/
# Copy airline data to HDFS
hadoop fs -mkdir /user/rstudio-user/airlines/
hadoop fs -put /tmp/airlines.csv /user/rstudio-user/airlines
Launch Hive from the command line.
# Open Hive prompt
hive
Create the metadata that will structure the flights table. Load data into the Hive table.
# Create metadata
CREATE EXTERNAL TABLE IF NOT EXISTS flights
(
year int,
month int,
dayofmonth int,
dayofweek int,
deptime int,
crsdeptime int,
arrtime int,
crsarrtime int,
uniquecarrier string,
flightnum int,
tailnum string,
actualelapsedtime int,
crselapsedtime int,
airtime string,
arrdelay int,
depdelay int,
origin string,
dest string,
distance int,
taxiin string,
taxiout string,
cancelled int,
cancellationcode string,
diverted int,
carrierdelay string,
weatherdelay string,
nasdelay string,
securitydelay string,
lateaircraftdelay string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\n'
TBLPROPERTIES("skip.header.line.count"="1");
# Load data into table
LOAD DATA INPATH '/user/rstudio-user/flights' INTO TABLE flights;
Create the metadata that will structure the airlines table. Load data into the Hive table.
# Create metadata
CREATE EXTERNAL TABLE IF NOT EXISTS airlines
(
Code string,
Description string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES
(
"separatorChar" = '\,',
"quoteChar" = '\"'
)
STORED AS TEXTFILE
tblproperties("skip.header.line.count"="1");
# Load data into table
LOAD DATA INPATH '/user/rstudio-user/airlines' INTO TABLE airlines;
Log in to RStudio Server by pointing a browser at your master node IP:8787.
Set the environment variable SPARK_HOME and then run spark_connect. After connecting you will be able to browse the Hive metadata in the RStudio Server Spark pane.
# Connect to Spark
library(sparklyr)
library(dplyr)
library(ggplot2)
Sys.setenv(SPARK_HOME="/usr/lib/spark")
config <- spark_config()
config[["spark.executor.memory"]] <- "8G"
sc <- spark_connect(master = "yarn-client", config = config, version = '1.6.2')
Once you are connected, you will see the Spark pane appear along with your hive tables.
You can inspect your tables by clicking on the data icon.
Is there evidence to suggest that some airline carriers make up time in flight? This analysis predicts time gained in flight by airline carrier.
Use tbl_cache to load the flights table into memory. Caching tables will make analysis much faster. Create a dplyr reference to the Spark DataFrame.
# Cache flights Hive table into Spark
tbl_cache(sc, 'flights')
flights_tbl <- tbl(sc, 'flights')
# Cache airlines Hive table into Spark
tbl_cache(sc, 'airlines')
airlines_tbl <- tbl(sc, 'airlines')
Filter the data to contain only the records to be used in the fitted model. Join carrier descriptions for reference. Create a new variable called gain which represents the amount of time gained (or lost) in flight.
# Filter records and create target variable 'gain'
model_data <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year >= 2003 & year <= 2007) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain)
# Summarize data by carrier
model_data %>%
group_by(uniquecarrier) %>%
summarize(description = min(description), gain=mean(gain),
distance=mean(distance), depdelay=mean(depdelay)) %>%
select(description, gain, distance, depdelay) %>%
arrange(gain)
Source: query [?? x 4]
Database: spark connection master=yarn-client app=sparklyr local=FALSE
description gain distance depdelay
<chr> <dbl> <dbl> <dbl>
1 ATA Airlines d/b/a ATA -3.3480120 1134.7084 56.06583
2 ExpressJet Airlines Inc. (1) -3.0326180 519.7125 59.41659
3 Envoy Air -2.5434415 416.3716 53.12529
4 Northwest Airlines Inc. -2.2030586 779.2342 48.52828
5 Delta Air Lines Inc. -1.8248026 868.3997 50.77174
6 AirTran Airways Corporation -1.4331555 641.8318 54.96702
7 Continental Air Lines Inc. -0.9617003 1116.6668 57.00553
8 American Airlines Inc. -0.8860262 1074.4388 55.45045
9 Endeavor Air Inc. -0.6392733 467.1951 58.47395
10 JetBlue Airways -0.3262134 1139.0443 54.06156
# ... with more rows
Predict time gained or lost in flight as a function of distance, departure delay, and airline carrier.
# Partition the data into training and validation sets
model_partition <- model_data %>%
sdf_partition(train = 0.5, valid = 0.5, seed = 5555)
# Fit a linear model
ml1 <- model_partition$train %>%
ml_linear_regression(gain ~ distance + depdelay + uniquecarrier)
# Summarize the linear model
summary(ml1)
Deviance Residuals: (approximate):
Min 1Q Median 3Q Max
-278.989 -5.531 2.717 9.763 169.343
Coefficients:
Estimate Std. Error t value Pr(>|t|)
(Intercept) -1.16170874 0.12951174 -8.9699 < 2.2e-16 ***
distance 0.00326516 0.00002115 154.3822 < 2.2e-16 ***
depdelay -0.01450404 0.00025757 -56.3120 < 2.2e-16 ***
uniquecarrierAA -2.39254076 0.13299650 -17.9895 < 2.2e-16 ***
uniquecarrierAQ 2.87415012 0.36446876 7.8859 3.109e-15 ***
uniquecarrierAS 0.83554860 0.14287016 5.8483 4.967e-09 ***
uniquecarrierB6 -2.07689984 0.14827271 -14.0073 < 2.2e-16 ***
uniquecarrierCO -2.63650102 0.13911623 -18.9518 < 2.2e-16 ***
uniquecarrierDH 2.04688357 0.14680358 13.9430 < 2.2e-16 ***
uniquecarrierDL -2.75750377 0.13426735 -20.5374 < 2.2e-16 ***
uniquecarrierEV 1.82241085 0.13556762 13.4428 < 2.2e-16 ***
uniquecarrierF9 -1.00496823 0.18206584 -5.5198 3.394e-08 ***
uniquecarrierFL -1.59329948 0.14012820 -11.3703 < 2.2e-16 ***
uniquecarrierHA 0.03845925 0.31586759 0.1218 0.9031
uniquecarrierHP 1.96885060 0.15611847 12.6113 < 2.2e-16 ***
uniquecarrierMQ -1.96236988 0.13334682 -14.7163 < 2.2e-16 ***
uniquecarrierNW -2.85675733 0.13589410 -21.0219 < 2.2e-16 ***
uniquecarrierOH 0.71608066 0.13947522 5.1341 2.835e-07 ***
uniquecarrierOO 0.53303490 0.13497744 3.9491 7.846e-05 ***
uniquecarrierTZ -5.29969902 0.20143394 -26.3099 < 2.2e-16 ***
uniquecarrierUA -0.73430558 0.13445784 -5.4612 4.729e-08 ***
uniquecarrierUS -0.68272488 0.13486704 -5.0622 4.145e-07 ***
uniquecarrierWN 3.77853692 0.13096010 28.8526 < 2.2e-16 ***
uniquecarrierXE -2.65574134 0.13620414 -19.4982 < 2.2e-16 ***
uniquecarrierYV 3.03446782 0.14744659 20.5801 < 2.2e-16 ***
---
Signif. codes: 0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
R-Squared: 0.0237
Root Mean Squared Error: 17.75
Compare the model performance using the validation data.
# Calculate average gains by predicted decile
model_deciles <- lapply(model_partition, function(x) {
sdf_predict(ml1, x) %>%
mutate(decile = ntile(prediction, 10)) %>%
group_by(decile) %>%
summarize(gain = mean(gain)) %>%
select(decile, gain) %>%
collect()
})
# Create a summary dataset for plotting
deciles <- rbind(
data.frame(data = 'train', model_deciles$train),
data.frame(data = 'valid', model_deciles$valid),
make.row.names = FALSE
)
# Plot average gains by predicted decile
deciles %>%
ggplot(aes(factor(decile), gain, fill = data)) +
geom_bar(stat = 'identity', position = 'dodge') +
labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')
Compare actual gains to predicted gains for an out of time sample.
# Select data from an out of time sample
data_2008 <- flights_tbl %>%
filter(!is.na(arrdelay) & !is.na(depdelay) & !is.na(distance)) %>%
filter(depdelay > 15 & depdelay < 240) %>%
filter(arrdelay > -60 & arrdelay < 360) %>%
filter(year == 2008) %>%
left_join(airlines_tbl, by = c("uniquecarrier" = "code")) %>%
mutate(gain = depdelay - arrdelay) %>%
select(year, month, arrdelay, depdelay, distance, uniquecarrier, description, gain)
# Summarize data by carrier
carrier <- sdf_predict(ml1, data_2008) %>%
group_by(description) %>%
summarize(gain = mean(gain), prediction = mean(prediction), freq = n()) %>%
filter(freq > 10000) %>%
collect
# Plot actual gains and predicted gains by airline carrier
ggplot(carrier, aes(gain, prediction)) +
geom_point(alpha = 0.75, color = 'red', shape = 3) +
geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
geom_text(aes(label = substr(description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')
Some carriers make up more time than others in flight, but the differences are relatively small. The average time gains between the best and worst airlines is only six minutes. The best predictor of time gained is not carrier but flight distance. The biggest gains were associated with the longest flights.